package com.data.dev.flink.keleiTopic.operationForPortScan;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaKelaiTopic.KelaiMsg;
import com.data.dev.common.javabean.kafkaKelaiTopic.KeleiMsgList;
import com.data.dev.utils.MsgToJavaBeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

 import java.util.Collections;

/**
 * 2022年6月17日15:18:52
 * @author wangxiaoming-ghq
 * 将消息中的多条事件打平
 */
@Slf4j
public   class keleiJsonArrayToKeleiFlatMapper extends BaseBean implements FlatMapFunction<KeleiMsgList, KelaiMsg> {

    @Override
    public void flatMap(KeleiMsgList keleiMsgList, Collector<KelaiMsg> keleiMsgCollector) {

        for(String kelaiMsgJson : Collections.singletonList(keleiMsgList.toString().replace("[", "").replace("]", ""))){
            KelaiMsg kelaiMsg = MsgToJavaBeanUtils.getKelaiMsgBean(kelaiMsgJson);
            log.info("提取到的单条信息：{}",kelaiMsg);
            keleiMsgCollector.collect(kelaiMsg);
        }
    }
}